package com.yxsk.relay.job.endpoint.combine.config;

import com.yxsk.relay.job.component.common.net.NetServer;
import com.yxsk.relay.job.component.common.net.NettyServer;
import com.yxsk.relay.job.component.common.net.handler.HttpServerHandler;
import com.yxsk.relay.job.component.common.net.handler.convert.CompressAbleMessageConvert;
import com.yxsk.relay.job.component.common.net.handler.convert.NetMessageConvert;
import com.yxsk.relay.job.component.common.net.handler.exception.GlobalNetServerExceptionHandler;
import com.yxsk.relay.job.component.common.net.handler.interceptor.RpcHandleInterceptor;
import com.yxsk.relay.job.component.common.net.handler.message.MessageServerHandler;
import com.yxsk.relay.job.component.common.net.pipeline.HttpServerPipelineFactory;
import com.yxsk.relay.job.component.common.net.serialization.JsonMessageEntitySerializer;
import com.yxsk.relay.job.component.common.protocol.caller.DefaultHttpClientFactory;
import com.yxsk.relay.job.component.common.protocol.caller.HttpRestfulRemoteCaller;
import com.yxsk.relay.job.component.common.protocol.caller.RemoteCaller;
import com.yxsk.relay.job.component.endpoint.config.RelayJobAdminConfig;
import com.yxsk.relay.job.component.endpoint.config.RelayJobSlaveConfig;
import com.yxsk.relay.job.component.endpoint.context.RelayJobNetContextHolder;
import com.yxsk.relay.job.component.endpoint.facade.DefaultJobHeadman;
import com.yxsk.relay.job.component.endpoint.facade.JobHeadman;
import com.yxsk.relay.job.component.endpoint.facade.net.RelayJobEndpointFacadeHandler;
import com.yxsk.relay.job.component.endpoint.log.stream.LocalDiskLogStream;
import com.yxsk.relay.job.component.endpoint.log.stream.LogStream;
import com.yxsk.relay.job.component.endpoint.net.HttpRestfulRpcHandleInterceptor;
import com.yxsk.relay.job.component.endpoint.net.RestfulGlobalNetExceptionHandler;
import org.apache.commons.lang3.StringUtils;
import org.jboss.netty.channel.ChannelPipelineFactory;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;

/**
 * @Description
 * @Author 11376
 * @CreateTime 2019/9/9 10:46
 */
public class NetServerConfig extends EndpointCombineAutoConfig {

    private NetServer netServer;

    private RelayJobSlaveConfig slaveConfig;

    private RelayJobAdminConfig adminConfig;

    private LogStream logStream;

    public NetServerConfig(RelayJobAdminConfig adminConfig, RelayJobSlaveConfig slaveConfig, LogStream logStream) {
        this.adminConfig = adminConfig;
        this.slaveConfig = slaveConfig;
        this.logStream = logStream;
    }

    protected void autoConfig() {
        this.init();
        this.netServer.start();
    }

    @Override
    protected String componentName() {
        return "Relay job network service";
    }

    private void init() {
        // 使用 netty网络服务
        this.netServer = new NettyServer(createChannelPipelineFactory(), this.slaveConfig.getPort());
    }

    private ChannelPipelineFactory createChannelPipelineFactory() {
        return new HttpServerPipelineFactory(createServerHandler());
    }

    /**
     * @return
     * @Author 1137
     * @Description Http 服务处理器
     * @Date
     * @Param
     */
    private HttpServerHandler createServerHandler() {
        return HttpServerHandler.builder()
                .exceptionHandler(exceptionHandler())
                .interceptors(interceptors())
                .messageConvert(messageConvert())
                .messageServerHandler(serverHandler())
                .build();
    }

    private GlobalNetServerExceptionHandler exceptionHandler() {
        return new RestfulGlobalNetExceptionHandler();
    }

    private List<RpcHandleInterceptor> interceptors() {
        List<RpcHandleInterceptor> handleInterceptors = new LinkedList<>();
        handleInterceptors.add(new HttpRestfulRpcHandleInterceptor());
        handleInterceptors.add(new RelayJobNetContextHolder.NetRequestContextInterceptor());
        return handleInterceptors;
    }

    private NetMessageConvert messageConvert() {
        return new CompressAbleMessageConvert();
    }

    private MessageServerHandler serverHandler() {
        return new RelayJobEndpointFacadeHandler(headman(),
                new JsonMessageEntitySerializer());
    }

    private JobHeadman headman() {
        DefaultJobHeadman headman = DefaultJobHeadman.builder()
                .adminConfig(this.adminConfig)
                .logStream(this.logStream)
                .remoteCaller(remoteCaller())
                .taskExecutor(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()))
                .tasks(new ConcurrentHashMap<>(16, 0.75f))
                .build();
        headman.setResultCallback(headman.new JobResultCallback());
        return headman;
    }

    private RemoteCaller remoteCaller() {
        return HttpRestfulRemoteCaller.builder()
                .httpClientFactory(new DefaultHttpClientFactory())
                .messageConvert(new CompressAbleMessageConvert())
                .serializer(new JsonMessageEntitySerializer())
                .build();
    }

}
